package rocketmq

import (
	"context"
	"gitee.com/yaodae/server-lib-go/common"
	"gitee.com/yaodae/server-lib-go/config"
	"gitee.com/yaodae/server-lib-go/log"
	"gitee.com/yaodae/server-lib-go/utils"
	"strings"
	"sync"
	"time"
)

var (
	DEBUG            = true
	reconnectionTime time.Time
	mutex            sync.Mutex
	curMqConfig      *MqConfig
	curProducersCfg  []*MqProducerConfig
	curConsumersCfg  []*MqConsumerConfig
)

func Init(cfg *config.RocketmqConfig) bool {
	if cfg == nil {
		return true
	}
	curMqConfig = &MqConfig{Endpoint: cfg.MqEndpoint, AccessKey: cfg.MqAccessKey,
		SecretKey: cfg.MqSecretKey}
	curProducersCfg = make([]*MqProducerConfig, 0)
	curConsumersCfg = make([]*MqConsumerConfig, 0)
	producerTopics := strings.Split(cfg.MqProducerTopics, ",")
	for _, topic := range producerTopics {
		curProducersCfg = append(curProducersCfg, &MqProducerConfig{Name: topic, Topic: topic, InstanceId: cfg.MqInstanceId})
	}
	consumerTopics := strings.Split(cfg.MqConsumerTopics, ",")
	for _, topic := range consumerTopics {
		curConsumersCfg = append(curConsumersCfg, &MqConsumerConfig{Name: topic, Topic: topic, Group: cfg.MqGroup, InstanceId: cfg.MqInstanceId})
	}
	// 初始客户端
	InitMq(curMqConfig)
	// 初始生产者
	for _, producersCfg := range curProducersCfg {
		if utils.IsEmpty(producersCfg.Topic) {
			continue
		}
		InitProducer(producersCfg)
	}
	// 初始消费者
	for _, consumersCfg := range curConsumersCfg {
		if utils.IsEmpty(consumersCfg.Topic) {
			continue
		}
		InitConsumer(consumersCfg)
	}
	reconnectionTime = time.Now()
	DEBUG = false
	return true
}

// PublishMsg 发送消息
func PublishMsg(name string, msg MqMsg) (bool, error) {
	flag, err := ExecutePublishMsg(name, msg)
	if err != nil {
		defer mutex.Unlock()
		mutex.Lock()
		// 如果距离上次重连不足5分钟，不进行重连
		if time.Now().Unix()-reconnectionTime.Unix() < 60*5 {
			return flag, err
		}
		// 发送异常时重新创建生产者
		for _, cfg := range curProducersCfg {
			if cfg.Name == name {
				InitProducer(cfg)
			}
		}
		reconnectionTime = time.Now()
		flag, err = ExecutePublishMsg(name, msg)
	}
	return flag, err
}

// ConsumeMsg 消费消息
func ConsumeMsg(name string, numOfMessages int32, waitSeconds int64, consumeFunc func(topic string, msg MqMsg) *common.LResult) {
	cli := GetConsumer(name)
	if cli == nil {
		log.Debug("rocketmq: consumer not found")
		return
	}
	ExecuteConsumeMsg(name, cli, numOfMessages, waitSeconds, consumeFunc)
}

// ConsumeMsg2 消费消息
func ConsumeMsg2(name string, numOfMessages int32, waitSeconds int64, consumeFunc func(topic string, msg MqMsg) *common.LResult) {
	w := NewMqWorker(context.Background(), name, numOfMessages, waitSeconds, consumeFunc)
	w.Run()
}
